{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Copyright (c) Microsoft Corporation. All rights reserved.  \n",
        "Licensed under the MIT License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-use-adla-as-compute-target.png)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# AML Pipeline with AdlaStep\n",
        "\n",
        "This notebook is used to demonstrate the use of AdlaStep in AML Pipelines. [AdlaStep](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.adla_step.adlastep?view=azure-ml-py) is used to run U-SQL scripts using Azure Data Lake Analytics service. "
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## AML and Pipeline SDK-specific imports"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "import os\n",
        "from msrest.exceptions import HttpOperationError\n",
        "\n",
        "import azureml.core\n",
        "from azureml.exceptions import ComputeTargetException\n",
        "from azureml.core import Workspace, Experiment\n",
        "from azureml.core.compute import ComputeTarget, AdlaCompute\n",
        "from azureml.core.datastore import Datastore\n",
        "from azureml.data.data_reference import DataReference\n",
        "from azureml.pipeline.core import Pipeline, PipelineData\n",
        "from azureml.pipeline.steps import AdlaStep\n",
        "\n",
        "# Check core SDK version number\n",
        "print(\"SDK version:\", azureml.core.VERSION)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Initialize Workspace\n",
        "\n",
        "Initialize a workspace object from persisted configuration. If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the [configuration Notebook](https://aka.ms/pl-config) first if you haven't."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "tags": [
          "create workspace"
        ]
      },
      "outputs": [],
      "source": [
        "ws = Workspace.from_config()\n",
        "print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Attach ADLA account to workspace\n",
        "\n",
        "To submit jobs to Azure Data Lake Analytics service, you must first attach your ADLA account to the workspace. You'll need to provide the account name and resource group of ADLA account to complete this part."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "tags": [
          "sample-adlacompute-attach"
        ]
      },
      "outputs": [],
      "source": [
        "adla_compute_name = 'testadl' # Name to associate with new compute in workspace\n",
        "\n",
        "# ADLA account details needed to attach as compute to workspace\n",
        "adla_account_name = \"<adla_account_name>\" # Name of the Azure Data Lake Analytics account\n",
        "adla_resource_group = \"<adla_resource_group>\" # Name of the resource group which contains this account\n",
        "\n",
        "try:\n",
        "    # check if already attached\n",
        "    adla_compute = AdlaCompute(ws, adla_compute_name)\n",
        "except ComputeTargetException:\n",
        "    print('attaching adla compute...')\n",
        "    attach_config = AdlaCompute.attach_configuration(resource_group=adla_resource_group, account_name=adla_account_name)\n",
        "    adla_compute = ComputeTarget.attach(ws, adla_compute_name, attach_config)\n",
        "    adla_compute.wait_for_completion()\n",
        "\n",
        "print(\"Using ADLA compute:{}\".format(adla_compute.cluster_resource_id))\n",
        "print(\"Provisioning state:{}\".format(adla_compute.provisioning_state))\n",
        "print(\"Provisioning errors:{}\".format(adla_compute.provisioning_errors))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Register Data Lake Storage as Datastore\n",
        "\n",
        "To register Data Lake Storage as Datastore in workspace, you'll need account information like account name, resource group and subscription Id. \n",
        "\n",
        "> AdlaStep can only work with data stored in the **default** Data Lake Storage of the Data Lake Analytics account provided above. If the data you need to work with is in a non-default storage, you can use a DataTransferStep to copy the data before training. You can find the default storage by opening your Data Lake Analytics account in Azure portal and then navigating to 'Data sources' item under Settings in the left pane.\n",
        "\n",
        "### Grant Azure AD application access to Data Lake Storage\n",
        "\n",
        "You'll also need to provide an Active Directory application which can access Data Lake Storage. [This document](https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-service-to-service-authenticate-using-active-directory) contains step-by-step instructions on how to create an AAD application and assign to Data Lake Storage. Couple of important notes when assigning permissions to AAD app:\n",
        "\n",
        "- Access should be provided at root folder level.\n",
        "- In 'Assign permissions' pane, select Read, Write, and Execute permissions for 'This folder and all children'. Add as 'An access permission entry and a default permission entry' to make sure application can access any new files created in the future."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "datastore_name = 'MyAdlsDatastore' # Name to associate with data store in workspace\n",
        "\n",
        "# ADLS storage account details needed to register as a Datastore\n",
        "subscription_id = os.getenv(\"ADL_SUBSCRIPTION_62\", \"<my-subscription-id>\") # subscription id of ADLS account\n",
        "resource_group = os.getenv(\"ADL_RESOURCE_GROUP_62\", \"<my-resource-group>\") # resource group of ADLS account\n",
        "store_name = os.getenv(\"ADL_STORENAME_62\", \"<my-datastore-name>\") # ADLS account name\n",
        "tenant_id = os.getenv(\"ADL_TENANT_62\", \"<my-tenant-id>\") # tenant id of service principal\n",
        "client_id = os.getenv(\"ADL_CLIENTID_62\", \"<my-client-id>\") # client id of service principal\n",
        "client_st = os.getenv(\"ADL_CLIENT_62_SECRET\", \"<my-client-secret>\") # the secret of service principal\n",
        "\n",
        "try:\n",
        "    adls_datastore = Datastore.get(ws, datastore_name)\n",
        "    print(\"found datastore with name: %s\" % datastore_name)\n",
        "except HttpOperationError:\n",
        "    adls_datastore = Datastore.register_azure_data_lake(\n",
        "        workspace=ws,\n",
        "        datastore_name=datastore_name,\n",
        "        subscription_id=subscription_id, # subscription id of ADLS account\n",
        "        resource_group=resource_group, # resource group of ADLS account\n",
        "        store_name=store_name, # ADLS account name\n",
        "        tenant_id=tenant_id, # tenant id of service principal\n",
        "        client_id=client_id, # client id of service principal\n",
        "        client_secret=client_st) # the secret of service principal\n",
        "    print(\"registered datastore with name: %s\" % datastore_name)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Setup inputs and outputs\n",
        "\n",
        "For purpose of this demo, we're going to execute a simple U-SQL script that reads a CSV file and writes portion of content to a new text file. First, let's create our sample input which contains 3 columns: employee Id, name and department Id."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# create a folder to store files for our job\n",
        "sample_folder = \"adla_sample\"\n",
        "\n",
        "if not os.path.isdir(sample_folder):\n",
        "    os.mkdir(sample_folder)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "%%writefile $sample_folder/sample_input.csv\n",
        "1, Noah, 100\n",
        "3, Liam, 100\n",
        "4, Emma, 100\n",
        "5, Jacob, 100\n",
        "7, Jennie, 100"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Upload this file to Data Lake Storage at location `adla_sample/sample_input.csv` and create a DataReference to refer to this file."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "sample_input = DataReference(\n",
        "    datastore=adls_datastore,\n",
        "    data_reference_name=\"employee_data\",\n",
        "    path_on_datastore=\"adla_sample/sample_input.csv\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Create PipelineData object to store output produced by AdlaStep."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "sample_output = PipelineData(\"sample_output\", datastore=adls_datastore)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Write your U-SQL script\n",
        "\n",
        "Now let's write a U-Sql script that reads above CSV file and writes the name column to a new file.\n",
        "\n",
        "Instead of hard-coding paths in your script, you can use `@@name@@` syntax to refer to inputs, outputs, and parameters.\n",
        "\n",
        "- If `name` is the name of an input or output port binding, any occurrences of `@@name@@` in the script are replaced with actual data path of corresponding port binding.\n",
        "- If `name` matches any key in the `params` dictionary, any occurrences of `@@name@@` will be replaced with corresponding value in the dictionary.\n",
        "\n",
        "Note the use of @@ syntax in the below script. Before submitting the job to Data Lake Analytics service, `@@emplyee_data@@` will be replaced with actual path of `sample_input.csv` in Data Lake Storage. Similarly, `@@sample_output@@` will be replaced with a path in Data Lake Storage which will be used to store intermediate output produced by the step."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "%%writefile $sample_folder/sample_script.usql\n",
        "\n",
        "// Read employee information from csv file\n",
        "@employees = \n",
        "    EXTRACT EmpId int, EmpName string, DeptId int\n",
        "    FROM \"@@employee_data@@\"\n",
        "    USING Extractors.Csv();\n",
        "\n",
        "// Export employee names to text file\n",
        "OUTPUT\n",
        "(\n",
        "    SELECT EmpName\n",
        "    FROM @employees\n",
        ")\n",
        "TO \"@@sample_output@@\"\n",
        "USING Outputters.Text();"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Create an AdlaStep\n",
        "\n",
        "**[AdlaStep](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.adla_step.adlastep?view=azure-ml-py)** is used to run U-SQL script using Azure Data Lake Analytics.\n",
        "\n",
        "- **name:** Name of module\n",
        "- **script_name:** name of U-SQL script file\n",
        "- **inputs:** List of input port bindings\n",
        "- **outputs:** List of output port bindings\n",
        "- **compute_target:** the ADLA compute to use for this job\n",
        "- **params:** Dictionary of name-value pairs to pass to U-SQL job *(optional)*\n",
        "- **degree_of_parallelism:** the degree of parallelism to use for this job *(optional)*\n",
        "- **priority:** the priority value to use for the current job *(optional)*\n",
        "- **runtime_version:** the runtime version of the Data Lake Analytics engine *(optional)*\n",
        "- **source_directory:** folder that contains the script, assemblies etc. *(optional)*\n",
        "- **hash_paths:** list of paths to hash to detect a change (script file is always hashed) *(optional)*\n",
        "\n",
        "The best practice is to use separate folders for scripts and its dependent files for each step and specify that folder as the `source_directory` for the step. This helps reduce the size of the snapshot created for the step (only the specific folder is snapshotted). Since changes in any files in the `source_directory` would trigger a re-upload of the snapshot, this helps keep the reuse of the step when there are no changes in the `source_directory` of the step."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "tags": [
          "adlastep-remarks-sample"
        ]
      },
      "outputs": [],
      "source": [
        "adla_step = AdlaStep(\n",
        "    name='extract_employee_names',\n",
        "    script_name='sample_script.usql',\n",
        "    source_directory=sample_folder,\n",
        "    inputs=[sample_input],\n",
        "    outputs=[sample_output],\n",
        "    compute_target=adla_compute)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Build and Submit the Experiment"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline = Pipeline(workspace=ws, steps=[adla_step])\n",
        "\n",
        "pipeline_run = Experiment(ws, 'adla_sample').submit(pipeline)\n",
        "pipeline_run.wait_for_completion()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### View Run Details"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.widgets import RunDetails\n",
        "RunDetails(pipeline_run).show()"
      ]
    }
  ],
  "metadata": {
    "authors": [
      {
        "name": "sanpil"
      }
    ],
    "category": "tutorial",
    "compute": [
      "Azure Data Lake Analytics"
    ],
    "datasets": [
      "Custom"
    ],
    "deployment": [
      "None"
    ],
    "exclude_from_index": false,
    "framework": [
      "Azure ML"
    ],
    "friendly_name": "How to use AdlaStep with AML Pipelines",
    "kernelspec": {
      "display_name": "Python 3.8 - AzureML",
      "language": "python",
      "name": "python38-azureml"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.6.6"
    },
    "order_index": 6,
    "tags": [
      "None"
    ],
    "task": "Demonstrates the use of AdlaStep"
  },
  "nbformat": 4,
  "nbformat_minor": 2
}